Spark API 文档
1. 创建和写入 LakeSoulTable
1.1 Table Name
LakeSoul 中表名可以是一个路径,数据存储的目录就是 LakeSoulTable 的表名。同时一个表可以有一个表名帮助记忆,或在SQL中访问,即不是路径形式的一个字符串。
当调用 Dataframe.write.save 方法向 LakeSoulTable 写数据时,若表不存在,则会使用存储路径自动创建新表,但是默认没有表名,只能通过路径访问,可以通过添加 option("shortTableName", "table_name")
选项来设置表名。
通过 DataFrame.write.saveAsTable,会创建表,可以通过表名访问,路径默认为 spark.sql.warehouse.dir
/current_database/table_name,后续可以通过路径或表名访问。如需自定义表路径,则可以加上 option("path", "s3://bucket/...")
选项。
通过 SQL 建表时,表名可以是路径或一个表名,路径必须是绝对路径。如果是表名,则路径的规则和上面 Dataframe.write.saveAsTable 一致,可以在 CREATE TABLE
SQL 中通过 LOCATION 子句设置。关于如何在 SQL 中创建主键分区表,可以参考 7. 使用 Spark SQL 操作 LakeSoul 表
1.2 元数据管理
LakeSoul 通过数据是管理 meta 数据,因此可以高效的处理元数据,并且 meta 集群可以很方便的在云上进行扩容。
1.3 Partition
LakeSoulTable 有两种分区方式,分别是 range 分区和 hash 分区,可以两种分区同时使用。
- range 分区即通常的基于时间的表分区,不同分区的数据文件存储在不同的分区路径下;
- 使用 hash 分区,必须同时指定 hash 分区主键字段和 hash bucket num,在写数据时,会根据 bucket num 对 hash 主键字段值进行散列,取模后相同数据会写到同一个文件,文件内部根据 hash 字段值升序排列;
- 若同时指定了 range 分区和 hash 分区,则每 个 range 分区内,hash 值相同的数据会写到同一个文件里;
- 指定分区后,写入 LakeSoulTable 的数据必须包含分区字段。
可以根据具体场景选择使用 range 分区或 hash 分区,或者同时使用两者。当指定 hash 分区后,LakeSoulTable 的数据将根据主键唯一,主键字段为 hash 分区字段 + range 分区字段(如果存在)。
当指定 hash 分区时,LakeSoulTable 支持 upsert 操作 (scala/sql),此时 append 模式写数据被禁止,可以使用 LakeSoulTable.upsert()
方法或者 MERGE INTO
SQL 语句。
1.4 代码示例
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
.config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
// 使用 SQL 功能还需要增加以下两个配置项
.config("spark.sql.catalog.lakesoul", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")
.config("spark.sql.defaultCatalog", "lakesoul")
.getOrCreate()
import spark.implicits._
val df = Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date","id","name")
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
//create table
//spark batch
df.write
.mode("append")
.format("lakesoul")
.option("rangePartitions","date")
.option("hashPartitions","id")
.option("hashBucketNum","2")
.save(tablePath)
//spark streaming
import org.apache.spark.sql.streaming.Trigger
val readStream = spark.readStream.parquet("inputPath")
val writeStream = readStream.writeStream
.outputMode("append")
.trigger(Trigger.ProcessingTime("1 minutes"))
.format("lakesoul")
.option("rangePartitions","date")
.option("hashPartitions","id")
.option("hashBucketNum", "2")
.option("checkpointLocation", "s3a://bucket-name/checkpoint/path")
.start(tablePath)
writeStream.awaitTermination()
//对于已存在的表,写数据时不需要再指定分区信息
//相当于 insert overwrite partition,如果不指定 replaceWhere,则会重写整张表
df.write
.mode("overwrite")
.format("lakesoul")
.option("replaceWhere","date='2021-01-01'")
.save(tablePath)
2. Read LakeSoulTable
可以通过 Spark read api 或者构建 LakeSoulTable 来读取数据,LakeSoul 也支持通过 Spark SQL 读取数据,详见 7. 使用 Spark SQL 操作 LakeSoulTable
2.1 代码示例
import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
.config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
.getOrCreate()
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
//方法一
val df1 = spark.read.format("lakesoul").load(tablePath)
//方法二
val df2 = LakeSoulTable.forPath(tablePath).toDF